package com.newegg.hardware.benchmark.elastic;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;

import org.apache.log4j.Logger;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.web.client.RestTemplate;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

public class ElasticClient {
	static Logger logger = Logger.getLogger(ElasticClient.class);
	static RestTemplate restTemplate;
	String elasticSearchRestUrl;
	
	static {
		restTemplate = new RestTemplate();
		restTemplate.getMessageConverters().set(1, new StringHttpMessageConverter(Charset.forName("UTF-8")));
	}
	
	ArrayBlockingQueue<ElasticRequest> queue = new ArrayBlockingQueue<ElasticRequest>(200);
	
	public ElasticClient(String elasticSearchRestUrl) {
		this.elasticSearchRestUrl = elasticSearchRestUrl;
		new Timer(true).schedule(new TimerTask() {
			@Override
			public void run() {
				pullThread();
			}
		}, 5000, 5000);
	}

	public static ElasticClient get(String elasticSearchRestUrl) {
		return new ElasticClient(elasticSearchRestUrl);
	}

	public int size() {
		return queue.size();
	}
	
	/**
	 * 随机获取一个 elasticSearch 的节点
	 */
	private String getRestUrl() {
		String[] urls = elasticSearchRestUrl.split(",");
		return "http://" + urls[ThreadLocalRandom.current().nextInt(0, urls.length)];
	}
	
	boolean requesting = false;
	void pullThread() {
		if(queue == null || queue.size() == 0 || this.requesting) { return; }
		this.requesting = true;
		try {
			List<ElasticRequest> reqs = null;
			synchronized (queue) {
				reqs = new ArrayList<>(queue);
				do {
					try {
						reqs = insertData(reqs);
					} catch (Exception e) {
						List<ElasticRequest> q1 = new ArrayList<>(reqs.subList(0, reqs.size() / 2));
						reqs.removeAll(q1);
						q1 = insertData(q1);
						List<ElasticRequest> q2 = insertData(reqs);
						reqs = new ArrayList<>();
						if(q1 != null && q1.size() > 0) { reqs.addAll(q1); }
						if(q2 != null && q2.size() > 0) { reqs.addAll(q2); }
					}
					if(reqs != null && reqs.size() > 0) { try { Thread.sleep(2000); } catch (InterruptedException e) { } }
				} while (reqs != null && reqs.size() > 0);
				queue.clear();
			}
		} finally {
			this.requesting = false;
		}
    }
	
	public void insert(ElasticRequest request) {
		synchronized (queue) {
			try { queue.put(request); } catch (InterruptedException e) { e.printStackTrace(); }
		}
		if(queue.size() >= 100) { pullThread(); }
	}

	List<ElasticRequest> insertData(List<ElasticRequest> requests){
		if(requests == null || requests.size() == 0) { return null; }
		StringBuilder bulkData = new StringBuilder();
		for (ElasticRequest req : requests) {
			bulkData.append("{ \"index\" : { \"_index\" : \"" + req.index() + "\", \"_id\" : \"" + req.key() + "\" } }\n");
			bulkData.append(JSON.toJSONString(req.getData())).append("\n");
		}
		HttpHeaders headers = new HttpHeaders();
		headers.add("Content-Type", MediaType.APPLICATION_JSON_VALUE);
		HttpEntity<String> formEntity = new HttpEntity<String>(bulkData.toString(), headers);
		ResponseEntity<String> response = null;
		try { response = restTemplate.exchange(this.getRestUrl() + "/_bulk", HttpMethod.POST, formEntity, String.class); } catch (Exception e) { }
		if(response == null || response.getStatusCode() != HttpStatus.OK) {
			List<ElasticRequest> faild = new ArrayList<>();
			for (ElasticRequest req : requests) {
				if(req.retry() > 0) {
					req.setRetry(req.retry() - 1);
					faild.add(req);
				}else {
					logger.error(response == null ? "Failed Query" : response.getStatusCode().name() + ": data request error:" + JSON.toJSONString(req));
				}
			}
			return faild; 
		}
		JSONObject json = JSON.parseObject(response.getBody());
		if (json.containsKey("error") || (json.containsKey("errors") && json.getBoolean("errors"))) {
			List<ElasticRequest> faild = new ArrayList<ElasticRequest>();
			Map<String, ElasticRequest> requestMap = requestToMap(requests);
			JSONArray array = json.getJSONArray("items");
			forEach(array, item->{
				JSONObject itemindex = item.getJSONObject("index");
				if(itemindex.getIntValue("status") > 300 || itemindex.containsKey("error")) {
					ElasticRequest request = requestMap.get(itemindex.getString("_index") + ":" + itemindex.getString("_id"));
					if(request!= null && request.retry() > 0) {
						faild.add(request.setRetry(request.retry() - 1));
					}else {
						logger.error("data insert error:" + JSON.toJSONString(request) + ":" + itemindex.toJSONString());
					}
				}
			});
			return faild;
		}
		return null;
	}
	
	public void update(ElasticRequest request) throws Exception {
		if(request == null) { return; }
		StringBuilder bulkData = new StringBuilder();
		bulkData.append("{ \"doc\" : " + JSON.toJSONString(request.getData()) + "}\n");
		HttpHeaders headers = new HttpHeaders();
		headers.add("Content-Type", MediaType.APPLICATION_JSON_VALUE);
		HttpEntity<String> formEntity = new HttpEntity<String>(bulkData.toString(), headers);
		ResponseEntity<String> response = restTemplate.exchange(this.getRestUrl() + "/" + request.index() + "/_update/" + request.key(), HttpMethod.POST, formEntity, String.class);
		if(response.getStatusCode() != HttpStatus.OK) { throw new RuntimeException(response.getBody()); }
		JSONObject json = JSON.parseObject(response.getBody());
		if (json.containsKey("error") || (json.containsKey("errors") && json.getBoolean("errors"))) {
			throw new RuntimeException("IndexDocument failed! errors: " + json.toString());
		}
	}
	
	public JSONObject search(String index, String params) {
		HttpHeaders headers = new HttpHeaders();
		headers.add("Content-Type", MediaType.APPLICATION_JSON_VALUE);
		HttpEntity<String> formEntity = new HttpEntity<String>(params, headers);
		ResponseEntity<String> response = restTemplate.exchange(this.getRestUrl() + "/" + index + "/_search", HttpMethod.POST, formEntity, String.class);
		if(response.getStatusCode() != HttpStatus.OK) { throw new RuntimeException(response.getBody()); }
		return JSON.parseObject(response.getBody());
	}
	
	public JSONObject get(String index, String key) {
		ResponseEntity<String> response = restTemplate.getForEntity(this.getRestUrl() + "/" + index + "/_search?q=_id:" + key, String.class);
		if(response.getStatusCode() != HttpStatus.OK) { throw new RuntimeException(response.getBody()); }
		return JSON.parseObject(response.getBody());
	}
	
	static void forEach(JSONArray arrays, Consumer<JSONObject> accept) {
		for (int i = 0; i < arrays.size(); i++) {
			JSONObject object = arrays.getJSONObject(i);
			accept.accept(object);
		}
	}
	
	static Map<String, ElasticRequest> requestToMap(List<ElasticRequest> requests){
		Map<String, ElasticRequest> map = new HashMap<String, ElasticRequest>();
		for (ElasticRequest req : requests) {
			map.put(req.index() + ":" + req.key(), req);
		}
		return map;
	}
}
